package com.atguigu.flink.cep;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.List;
import java.util.Map;

/**
 * Created by Smexy on 2022/12/20
 *      匹配后跳过策略:  目的减少不必要(重复)的匹配
 */
public class Demo12_Skip
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
            .<WaterSensor>forMonotonousTimestamps()
            .withTimestampAssigner((e, r) -> e.getTs());


        SingleOutputStreamOperator<WaterSensor> ds =
            env.readTextFile("data/cep.txt")
               .map(new WaterSensorMapFunction())
               .assignTimestampsAndWatermarks(watermarkStrategy);


        /*
             begin(final String name,
                   final AfterMatchSkipStrategy afterMatchSkipStrategy)

                 AfterMatchSkipStrategy: 匹配后的跳过策略，可以设置，将匹配的结果过滤！

                 AfterMatchSkipStrategy.noSkip(): 默认
         */
        Pattern<WaterSensor, WaterSensor> pattern = Pattern.<WaterSensor>begin("规则1",
            AfterMatchSkipStrategy.skipPastLastEvent()
            )
            .where(new SimpleCondition<WaterSensor>()
            {
                //定义匹配规则
                @Override
                public boolean filter(WaterSensor value) throws Exception {
                    return "s1".equals(value.getId());
                }
            })
            .oneOrMore()
            .followedBy("s2")
            .where(new SimpleCondition<WaterSensor>()
            {
                //定义匹配规则
                @Override
                public boolean filter(WaterSensor value) throws Exception {
                    return "s2".equals(value.getId());
                }
            })
            ;


        PatternStream<WaterSensor> patternStream = CEP.pattern(ds, pattern);


        patternStream.select(new PatternSelectFunction<WaterSensor, String>()
        {

            @Override
            public String select(Map<String, List<WaterSensor>> map) throws Exception {
                return map.toString();
            }
        })
                     .print().setParallelism(1);


        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }


    }
}
